Celery
author::牛哄哄的celery
一、Celery
1 定义
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成:
- 消息中间件(message broker):Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括RabbitMQ,Redis等等任务执行单元。
- 人物执行单元(worker):Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
- 人物执行结果存储(task result store):Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等
消息处理顺序:
User
:生产者,产生消息的一方。如:Django。
message broker
:消息中间件,管理消息的分发。如:RabbitMQ、Redis。
worker
:执行单元,异步任务的执行者。如:Celery。
task result store
:存储结果单元,存储处理后的结果,如:Redis。
Celery特点:
Simple(简单):Celery 使用和维护都非常简单,并且不需要配置文件。
Highly Available(高可用):woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
Fast(快速):单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
Flexible(灵活):Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
2 安装
安装:pip install -U Celery
使用Redis作为Broker时需要额外安装:celery-with-redis
3 简单使用
异步任务不是直接运行,而是用命令执行
启动:celery -A celery_task worker -l info -P eventlet
:
-A
:对应的py文件
-l
:指定日志模式
-P
:指定模块,此处是告知celery用eventlet开启协程。
-c
:并发数量(-c 10
)
下述案例执行后可以观察到:两个任务几乎同时执行,同时过了5秒后两个任务几乎同时结束,说明任务是异步执行的。
from celery_task import send_email, send_msg
result = send_email.delay("yuan") print(result.id)
result = send_msg.delay("msg") print(result.id)
|
import celery import time
broker='redis://127.0.0.1:6379/1' backend='redis://127.0.0.1:6379/2'
app=celery.Celery('test', broker=broker, backend=backend)
@app.task def send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"
@app.task def send_msg(name): print("向%s发送消息..."%name) time.sleep(5) print("向%s发送消息完成"%name) return "ok"
|
from celery.result import AsyncResult
from celery_task import cel
async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)
if async_result.successful(): result = async_result.get() print(result) elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
|
4 模块化
为了更好的使用celery,我们可以将celery的生产者解耦开来。先创建一个celery_tasks
文件夹,然后在文件夹下创建如下文件:
celery.py
:celery配置文件
taskxx.py
:任务文件
check_result.py
:结果检测文件
创建好后用命令启动,同时执行生产者代码即可。
from celery import Celery
app = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_tasks.task01', 'celery_tasks.task02' ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
|
import time from celery_tasks.celery import app
@cel.task def send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"
|
import time from celery_tasks.celery import app
@cel.task def send_msg(name): print("向%s发送消息..."%name) time.sleep(5) print("向%s发送消息完成"%name) return "ok"
|
from celery.result import AsyncResult from celery_task import app
'''验证任务的执行状态的'''
def check_task_status(task_id): ''' 任务的执行状态: PENDING :等待执行 STARTED :开始执行 RETRY :重新尝试执行 SUCCESS :执行成功 FAILURE :执行失败 :param task_id: :return: ''' result = AsyncResult(id=task_id, app=app) dic = { 'code': 400, 'type': result.status, 'message': '', 'data': '',
} if result.status == 'PENDING': dic['message'] = '任务等待中' elif result.status == 'STARTED': dic['message'] = '任务开始执行' elif result.status == 'RETRY': dic['message'] = '任务重新尝试执行' elif result.status == 'FAILURE': dic['message'] = '任务执行失败了' elif result.status == 'SUCCESS': result = result.get() dic['message'] = '任务执行成功' dic['data'] = result dic['code'] = 200 return dic
|
5 简单定时任务
定义生产者任务为定时任务,此时调用生产者任务的函数不再是delay
,而是apply_async
。此时该任务也不是个异步任务,而是一个定时任务。
from celery_task import send_email from datetime import datetime, timedelta
current_time = datetime.now() delayed_time = current_time + timedelta(seconds=15) time_utc = datetime.utcfromtimestamp(delayed_time.timestamp()) print(time_utc)
result = send_email.apply_async(args=["alex",], eta=time_utc)
print(result.id)
|
6 模块化定时任务
celery_tasks
文件夹:
celery.py
:需要新增配置
taskxx.py
:不变
check_result.py
:不变
配置好了以后,需要通过celery beat
命令插入任务,从而定时性执行add-every-10-seconds
。
启动定时任务:celery -A celery_task beat
。启动后每隔设定的时间会向数据库中加入任务。
之后再使用celery -A celery_task worker -l info -P eventlet
查看结果。使用前,可能数据库中会有之前的残余任务,如果确定是不需要的任务就先删除之前的任务。
注意:一定要带-P eventlet
。不开协程会一直卡主。
from datetime import timedelta from celery import Celery from celery.schedules import crontab
import redis r = redis.Redis(host="127.0.0.1", port=6379, db=10)
r.flushdb()
app = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_tasks.task01', 'celery_tasks.task02', ])
app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False
app.conf.beat_schedule = { 'add-every-10-seconds': { 'task': 'celery_tasks.task01.send_email', 'schedule': timedelta(seconds=6), 'args': ('张三',) }, }
|
二、Django使用Celery
1 基础配置
Django使用celery
在Django中,可以新建一个文件夹专门处理异步任务,和app同级,命名如下:
my_celery(所有celery任务):
- config.py(配置文件)
- main.py(基本文件)
- email(处理邮箱):
- sms(处理信息):
broker_url = "redis://127.0.0.1:6379/14" result_backend = "redis://127.0.0.1:6379/15"
|
import os from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xxxx.settings')
app = Celery("sms")
app.config_from_object("my_celery.config")
app.autodiscover_tasks(["my_celery.sms",])
app.send_task('my_celery.orders.tasks.start_check_charging_status')
|
from my_celery.main import app import time
import logging log = logging.getLogger("django")
@app.task(name="send_sms") def send_sms(mobile): """发送短信""" print("向手机号%s发送短信成功!"%mobile) time.sleep(5)
return "send_sms OK"
@app.task def send_sms2(mobile): print("向手机号%s发送短信成功2!" % mobile) time.sleep(5)
return "send_sms2 OK"
|
2 使用
from django.shortcuts import render,HttpResponse from mycelery.sms.tasks import send_sms,send_sms2 from datetime import timedelta
from datetime import datetime def test(request):
return HttpResponse('ok')
|